跳到主要内容

RocketMQ 学习03 顺序消息、延时消息、批量消息

顺序消息

消息有序指的是可以按照消息的发送顺序来消费(先进先出,FIFO)。RocketMQ 可以严格的保证消息有序,可以分为分区有序或者全局有序。

顺序消费的原理解析,在默认的情况下消息发送会采取 Round Robin 轮询方式把消息发送到不同的 queue(分区队列);而消费消息的时候从多个 queue 上拉取消息,这种情况发送和消费是不能保证顺序。

但是如果控制发送的顺序消息只 依次发送到同一个 queue 中,消费的时候只从这个 queue 上依次拉取,则就保证了顺序。当发送和消费参与的 queue 只有一个,则是全局有序;

如果多个 queue 参与,则为分区有序,即相对每个 queue,消息都是有序的。

下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个 OrderId 获取到的肯定是同一个队列。

创建生成订单类

这里使用内部类

/**
* 订单的步骤
*/
@Data
private static class OrderStep {
private long orderId;
private String desc;
}

编写一个生成模拟订单数据的方法

/**
* 生成模拟订单数据
*/
private List<OrderStep> buildOrders() {
List<OrderStep> orderList = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L + i);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
}
return orderList;
}

生产者代码

public class OrderConsumptionTest {
public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.指定Nameserver地址
producer.setNamesrvAddr("192.168.211.151:9876;192.168.211.152:9876");
//3.启动producer
producer.start();
//构建消息集合
List<OrderStep> orderSteps = new OrderConsumptionTest().buildOrders();
//发送消息
for (int i = 0; i < orderSteps.size(); i++) {
String body = orderSteps.get(i) + "";
Message message = new Message("OrderTopic", "Order", "i" + i, body.getBytes());
/**
* 参数一:消息对象
* 参数二:消息队列的选择器(选择使用哪个消息队列)
* 参数三:选择队列的业务标识(订单ID)
*/
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
/**
*
* @param mqs:队列集合
* @param msg:消息对象
* @param arg:业务标识的参数
* @return 选择的消息队列
*/
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
long orderId = (long) arg;
// 发送到消息队列里面,这里随便选个消息队列发送出去(取余)
// long index = orderId % mqs.size();
// return mqs.get((int) index);

// 这里每次只选择同一个队列,那么消费时也是按照这个队列的顺序消费的
// 反之,如果使用上面那种取余的方式,那么消费时分区有序,即一个线程只完成唯一标识的订单消息
return mqs.get(0);
}
}, orderSteps.get(i).getOrderId());

System.out.println("发送结果:" + sendResult);
}
producer.shutdown();
}

// 下面就是上面创建的内部类,和 buildOrders 方法...

注意上面的队列选择器,它每次都选择第一个消息队列

RocketMQ 采用了局部顺序一致性的机制,实现了单个队列中的消息严格有序。也就是说,如果想要保证顺序消费,必须将一组消息发送到同一个队列中,然后再由消费者进行注意消费。

如果不追求消息一致性,可以使用上面那种取余的方式使用消息队列(负载均衡)

long index = orderId % mqs.size();
return mqs.get((int) index);

消费者代码

/**
* 订单消费者
*
* @author alsritter
* @version 1.0
**/
public class OrderConsumptionConsumer {
public static void main(String[] args) throws MQClientException {
//1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.指定Nameserver地址
consumer.setNamesrvAddr("192.168.211.151:9876;192.168.211.152:9876");
//3.订阅主题Topic和Tag
consumer.subscribe("OrderTopic", "*");

//4.注册消息监听器
consumer.registerMessageListener(new MessageListenerOrderly() {

@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("线程名称:【" + Thread.currentThread().getName() + "】:" + new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});

//5.启动消费者
consumer.start();
System.out.println("消费者启动");

}
}

注意看打印的线程,就算线程不同,队列依旧是有序的

延时消息

比如电商里,提交了一个订单就可以发送一个延时消息,1h 后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

注意:现在 RocketMq 并不支持任意时间的延时,需要设置几个固定的延时等级,从 1s 到 2h 分别对应着等级 1 到 18

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

生产者 与之前普通消费的生产者代码几乎一样,只是多了句 msg.setDelayTimeLevel(2) 来控制延时间,这里的 Level 就是上面的那个 messageDelayLevel 对应的时间等级

生产者

public class Producer {

public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.指定Nameserver地址
producer.setNamesrvAddr("192.168.211.151:9876;192.168.211.152:9876");
//3.启动producer
producer.start();

for (int i = 0; i < 10; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体
/**
* 参数一:消息主题Topic
* 参数二:消息Tag
* 参数三:消息内容
*/
Message msg = new Message("DelayTopic", "Tag1", ("Hello World" + i).getBytes());
//设定延迟时间
msg.setDelayTimeLevel(2);
//5.发送消息
SendResult result = producer.send(msg);
//发送状态
SendStatus status = result.getSendStatus();

System.out.println("发送结果:" + result);

//线程睡1秒
TimeUnit.SECONDS.sleep(1);
}

//6.关闭生产者producer
producer.shutdown();
}

}

消费者

public class Consumer {

public static void main(String[] args) throws Exception {
//1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.指定Nameserver地址
consumer.setNamesrvAddr("192.168.211.151:9876;192.168.211.152:9876");
//3.订阅主题Topic和Tag
consumer.subscribe("DelayTopic", "*");

//4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {

//接受消息内容
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("消息ID:【" + msg.getMsgId() + "】,延迟时间:" + (System.currentTimeMillis() - msg.getStoreTimestamp()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.启动消费者consumer
consumer.start();

System.out.println("消费者启动");
}
}

注意:可能因为网络延迟,机器卡顿等一些因素,消息可能不是及时送达的

批量消息

批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的 topic,相同的 waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过 4MB。

生产者代码

public class Producer {

public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.指定Nameserver地址
producer.setNamesrvAddr("192.168.211.151:9876;192.168.211.152:9876");
//3.启动producer
producer.start();


List<Message> msgs = new ArrayList<Message>();


//4.创建消息对象,指定主题Topic、Tag和消息体
/**
* 参数一:消息主题Topic
* 参数二:消息Tag
* 参数三:消息内容
*/
Message msg1 = new Message("BatchTopic", "Tag1", ("Hello World" + 1).getBytes());
Message msg2 = new Message("BatchTopic", "Tag1", ("Hello World" + 2).getBytes());
Message msg3 = new Message("BatchTopic", "Tag1", ("Hello World" + 3).getBytes());

msgs.add(msg1);
msgs.add(msg2);
msgs.add(msg3);

//5.发送消息
SendResult result = producer.send(msgs);
//发送状态
SendStatus status = result.getSendStatus();

System.out.println("发送结果:" + result);

//线程睡1秒
TimeUnit.SECONDS.sleep(1);


//6.关闭生产者producer
producer.shutdown();
}

}

消息分割

如果消息的总长度可能大于 4MB 时,这时候最好把消息进行分割

编写一个迭代器,用来切割过大的消息列表

public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1024 * 1024 * 4;
private final List<Message> messages;
private int currIndex;

public ListSplitter(List<Message> messages) {
this.messages = messages;
}

@Override
public boolean hasNext() {
return currIndex < messages.size();
}

/**
* 重写迭代方法
*/
@Override
public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}

tmpSize = tmpSize + 20; // 增加日志的开销 20 字节
if (tmpSize > SIZE_LIMIT) {
//单个消息超过了最大的限制
//忽略,否则会阻塞分裂的进程
if (nextIndex - currIndex == 0) {
//假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
nextIndex++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}

List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}

使用这个工具类

// 把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
// 处理 error
}
}